Skip to main content

Task Class

The Task class represents a single task with full execution details and event streaming capabilities. It provides methods for monitoring task progress, streaming events, and managing task lifecycle.

Properties

id

id: str
Unique identifier for the task.

agent_id

agent_id: str
Identifier for the associated agent.

status

status: AgentExecutionStatus
Current execution status of the task.

internal_status

internal_status: Optional[str]
Optional internal status field that customers can use to report their own status alongside the standard task status. Limited to 255 characters.

input

input: AgentExecutionInput
Input parameters provided to the task.

result

result: Optional[str]
Execution result if the task has completed.

events_streaming

events_streaming: bool
Whether real-time event streaming is enabled for this task.

created_at

created_at: datetime
Timestamp when the task was created.

updated_at

updated_at: datetime
Timestamp when the task was last updated.

output_format

output_format: Optional[OutputFormat]
Configured output format for the task results.

output_schema

output_schema: Optional[Dict]
JSON schema for output validation, if specified.

Class Methods

aload(task_id, configuration=None)

Asynchronously load a task by its ID.
@classmethod
async def aload(
    cls, 
    task_id: str, 
    configuration: Optional[Configuration] = None
) -> Task
Parameters:
  • task_id (str): Unique identifier for the task
  • configuration (Optional[Configuration]): SDK configuration
Returns: Complete Task object with full details. Example:
task = await Task.aload("task-456")
print(f"Task status: {task.status}")
Raises:
  • [ModuleException](/API reference/exceptions): If the task is not found or access is denied.

load(task_id, configuration=None)

Synchronously load a task by its ID.
@classmethod
def load(
    cls, 
    task_id: str, 
    configuration: Optional[Configuration] = None
) -> Task
Parameters: Same as aload() Returns: Complete Task object. Example:
task = Task.load("task-456")
print(f"Task status: {task.status}")

Instance Methods

aset_status(status)

Asynchronously change the task’s execution status.
async def aset_status(status: AgentExecutionStatus) -> None
Parameters:
  • status (AgentExecutionStatus): New status for the task
Example:
await task.aset_status(AgentExecutionStatus.InProgress)
print(f"Task status updated to: {task.status}")
Related: [AgentExecutionStatus](/API reference/types#agentexecutionstatus)

set_status(status)

Synchronously change the task’s execution status.
def set_status(status: AgentExecutionStatus) -> None
Parameters: Same as aset_status() Example:
task.set_status(AgentExecutionStatus.Completed)

asave()

Asynchronously save task changes back to the xpander platform.
async def asave() -> Task
Returns: Updated Task object with server-side changes. Example:
task.result = "Processing completed successfully"
updated_task = await task.asave()
print(f"Task saved with result: {updated_task.result}")
Raises:
  • [ModuleException](/API reference/exceptions): If the save operation fails.

save()

Synchronously save task changes.
def save() -> Task
Returns: Updated Task object. Example:
task.result = "Processing completed successfully"
updated_task = task.save()

astop()

Asynchronously stop the task execution.
async def astop() -> Task
Returns: Stopped Task object with updated status. Example:
stopped_task = await task.astop()
print(f"Task {stopped_task.id} has been stopped")
print(f"Final status: {stopped_task.status}")
Raises:
  • [ModuleException](/API reference/exceptions): If the task cannot be stopped.

stop()

Synchronously stop the task execution.
def stop() -> Task
Returns: Stopped Task object. Example:
stopped_task = task.stop()
print(f"Task stopped with status: {stopped_task.status}")

aevents()

Asynchronously stream task events in real-time.
async def aevents() -> AsyncGenerator[TaskUpdateEvent, None]
Returns: Async generator yielding TaskUpdateEvent objects. Example:
async for event in task.aevents():
    print(f"Event: {event.type} at {event.time}")
    print(f"Data: {event.data}")
    
    if event.type == TaskUpdateEventType.TaskFinished:
        print("Task completed!")
        break
Related: [TaskUpdateEvent](/API reference/types#taskupdateevent), [TaskUpdateEventType](/API reference/types#taskupdateeventtype) Note: This method requires events_streaming=True when creating the task.

events()

Synchronously stream task events.
def events() -> Generator[TaskUpdateEvent, None, None]
Returns: Generator yielding TaskUpdateEvent objects. Example:
for event in task.events():
    print(f"Event: {event.type}")
    if event.type == "task_finished":
        break

get_files()

Get PDF files from task input, formatted for Agno integration.
def get_files() -> list[Any]
Returns: List of Agno File objects (when Agno is available) or URL strings. Returns empty list if no PDF files are present. Example:
files = task.get_files()
result = await agno_agent.arun(
    input=task.to_message(),
    files=files
)

get_images()

Get image files from task input, formatted for Agno integration.
def get_images() -> list[Any]
Returns: List of Agno Image objects (when Agno is available) or URL strings. Returns empty list if no image files are present. Example:
images = task.get_images()
result = await agno_agent.arun(
    input=task.to_message(),
    images=images
)

get_human_readable_files()

Get human-readable files from task input with their content.
def get_human_readable_files() -> list[dict[str, str]]
Returns: List of dictionaries with ‘url’ and ‘content’ keys. Returns empty list if no human-readable files are present. Example:
readable_files = task.get_human_readable_files()
for file_data in readable_files:
    print(f"File: {file_data['url']}")
    print(f"Content: {file_data['content']}")

to_message()

Convert task input to a formatted message string.
def to_message() -> str
Returns: Formatted message string including text, file URLs, and readable file content. Example:
message = task.to_message()
result = await agno_agent.arun(input=message)

Usage Examples

Complete Task Lifecycle Management

from xpander_sdk import Tasks, Agent
from xpander_sdk.models.shared import AgentExecutionStatus, OutputFormat

async def manage_task_lifecycle():
    # Create task with event streaming
    agent = await Agent.aload("agent-123")
    task = await agent.acreate_task(
        prompt="Analyze sales data",
        events_streaming=True,
        output_format=OutputFormat.Json
    )
    
    print(f"Created task: {task.id}")
    print(f"Initial status: {task.status}")
    
    # Monitor task events
    async for event in task.aevents():
        print(f"Event: {event.type}")
        
        if event.type == "task_finished":
            # Reload task to get final result
            final_task = await task.aload(task.id)
            print(f"Final result: {final_task.result}")
            break
        elif event.type == "tool_call_request":
            tool_call = event.data
            print(f"Tool requested: {tool_call.tool_name}")

Task Status Management

from xpander_sdk import Task
from xpander_sdk.models.shared import AgentExecutionStatus

async def update_task_status():
    # Load existing task
    task = await Task.aload("task-456")
    
    # Update status
    await task.aset_status(AgentExecutionStatus.InProgress)
    
    # Add custom result
    task.result = "Custom processing completed"
    
    # Set custom internal status for tracking
    task.internal_status = "Data validation completed"
    
    # Save changes
    updated_task = await task.asave()
    print(f"Task updated: {updated_task.status}")
    print(f"Internal status: {updated_task.internal_status}")

Internal Status Tracking

The internal_status field provides optional custom context information with a 255 character limit. It’s set once before the task is saved or returned.
from xpander_sdk import Tasks

async def track_internal_status():
    tasks = Tasks()
    
    # Create task
    task = await tasks.acreate(
        agent_id="agent-123",
        prompt="Process customer orders"
    )
    
    # Your processing logic here
    order_count = 0
    # ... process customer orders ...
    order_count = 25  # Example: processed 25 orders
    
    # Set result
    task.result = f"Successfully processed {order_count} customer orders"
    
    # Optional: Set internal status once before saving (max 255 characters)
    task.internal_status = f"Order processing completed: {order_count} orders validated and shipped"
    await task.asave()  # internal_status is persisted here
    
    print(f"Task Status: {task.status}")           # e.g., "completed"
    print(f"Internal Status: {task.internal_status}")  # "Order processing completed: 25 orders validated and shipped"
    
    # Example: Handle character limit validation (255 characters max)
    detailed_status = f"Comprehensive order processing completed with validation, payment processing, and shipping for {order_count} orders including special handling for priority customers"
    if len(detailed_status) > 255:
        task.internal_status = detailed_status[:255]  # Truncate to fit limit
    else:
        task.internal_status = detailed_status
    
    await task.asave()  # Save the final internal status

Error Handling with Tasks

from xpander_sdk import Task
from xpander_sdk.exceptions import ModuleException

async def safe_task_operations():
    try:
        # Attempt to load task
        task = await Task.aload("non-existent-task")
    except ModuleException as e:
        if e.status_code == 404:
            print("Task not found")
        else:
            print(f"Error loading task: {e.description}")
        return
    
    try:
        # Attempt to stop task
        await task.astop()
        print("Task stopped successfully")
    except ModuleException as e:
        print(f"Failed to stop task: {e.description}")

Batch Task Monitoring

from xpander_sdk import Tasks
import asyncio

async def monitor_multiple_tasks():
    tasks = Tasks()
    
    # Get all tasks for an agent
    task_list = await tasks.alist("agent-123")
    
    # Load full details for running tasks
    running_tasks = []
    for task_item in task_list:
        full_task = await task_item.aload()
        if full_task.status == AgentExecutionStatus.InProgress:
            running_tasks.append(full_task)
    
    print(f"Monitoring {len(running_tasks)} running tasks")
    
    # Monitor all running tasks concurrently
    async def monitor_task(task):
        async for event in task.aevents():
            print(f"Task {task.id}: {event.type}")
            if event.type == "task_finished":
                break
    
    # Start monitoring all tasks
    await asyncio.gather(*[monitor_task(task) for task in running_tasks])

File Handling with Agno Integration

from xpander_sdk import Tasks, Backend
from agno.agent import Agent

async def process_task_with_files():
    # Create task with various file types
    tasks = Tasks()
    task = await tasks.acreate(
        agent_id="agent-123",
        prompt="Analyze the attached documents and images",
        file_urls=[
            "https://example.com/report.pdf",      # PDF document
            "https://example.com/chart.png",       # Image file
            "https://example.com/data.csv",        # Human-readable file
            "https://example.com/logo.jpg"         # Another image
        ]
    )
    
    # Get files categorized by type for Agno integration
    files = task.get_files()    # PDF files as Agno File objects
    images = task.get_images()  # Image files as Agno Image objects
    readable_files = task.get_human_readable_files()  # Text files with content
    
    print(f"Task {task.id} has:")
    print(f"  {len(files)} PDF files")
    print(f"  {len(images)} image files")
    print(f"  {len(readable_files)} readable files")
    
    # Initialize Agno agent with xpander backend
    backend = Backend()
    agno_agent = Agent(**backend.get_args())
    
    # Pass files and images directly to Agno
    result = await agno_agent.arun(
        input=task.to_message(),  # Includes text + file URLs + readable content
        files=files,              # PDF files as Agno File objects
        images=images            # Image files as Agno Image objects
    )
    
    print(f"Analysis result: {result.content}")
    
    # Access individual readable file contents if needed
    for file_data in readable_files:
        print(f"\nFile: {file_data['url']}")
        print(f"Content preview: {file_data['content'][:200]}...")
  • [Agent](/API reference/agents/API reference/agent): Agent that creates and manages tasks
  • [AgentExecutionStatus](/API reference/types#agentexecutionstatus): Task execution statuses
  • [TaskUpdateEvent](/API reference/types#taskupdateevent): Event data structure
  • [TaskUpdateEventType](/API reference/types#taskupdateeventtype): Event type enumeration
  • [AgentExecutionInput](/API reference/types#agentexecutioninput): Task input configuration
  • [Agents Module](/API reference/agents): Create and manage agents
  • [Events Module](/API reference/events): Event-driven programming
  • [Tools Repository](/API reference/tools): Tool integration and management
I